/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.iceberg;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.RuntimeUnit;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.predicate.Range;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.DecimalType;
import com.facebook.presto.common.type.FixedWidthType;
import com.facebook.presto.common.type.KllSketchType;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.hive.NodeVersion;
import com.facebook.presto.iceberg.statistics.KllHistogram;
import com.facebook.presto.iceberg.statistics.StatisticsFileCache;
import com.facebook.presto.iceberg.statistics.StatisticsFileCacheKey;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.statistics.ColumnStatisticMetadata;
import com.facebook.presto.spi.statistics.ColumnStatisticType;
import com.facebook.presto.spi.statistics.ColumnStatistics;
import com.facebook.presto.spi.statistics.ComputedStatistics;
import com.facebook.presto.spi.statistics.DisjointRangeDomainHistogram;
import com.facebook.presto.spi.statistics.DoubleRange;
import com.facebook.presto.spi.statistics.Estimate;
import com.facebook.presto.spi.statistics.TableStatistics;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.theta.CompactSketch;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ContentScanTask;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.BlobMetadata;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinReader;
import org.apache.iceberg.puffin.PuffinWriter;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.DateType.DATE;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
import static com.facebook.presto.common.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static com.facebook.presto.common.type.TypeUtils.isNumericType;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.common.type.Varchars.isVarcharType;
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getStatisticSnapshotRecordDifferenceWeight;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getStatisticsKllSketchKParameter;
import static com.facebook.presto.iceberg.IcebergUtil.getIdentityPartitions;
import static com.facebook.presto.iceberg.Partition.toMap;
import static com.facebook.presto.iceberg.TypeConverter.toPrestoType;
import static com.facebook.presto.iceberg.statistics.KllHistogram.isKllHistogramSupportedType;
import static com.facebook.presto.iceberg.util.StatisticsUtil.calculateAndSetTableSize;
import static com.facebook.presto.iceberg.util.StatisticsUtil.formatIdentifier;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.HISTOGRAM;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES;
import static com.facebook.presto.spi.statistics.SourceInfo.ConfidenceLevel.HIGH;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.collect.Iterators.getOnlyElement;
import static java.lang.Long.parseLong;
import static java.lang.Math.abs;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static java.util.stream.Collectors.toSet;
import static org.apache.iceberg.SnapshotSummary.TOTAL_RECORDS_PROP;

public class TableStatisticsMaker
{
    private static final Logger log = Logger.get(TableStatisticsMaker.class);
    private static final String ICEBERG_THETA_SKETCH_BLOB_TYPE_ID = "apache-datasketches-theta-v1";
    private static final String ICEBERG_DATA_SIZE_BLOB_TYPE_ID = "presto-sum-data-size-bytes-v1";
    private static final String ICEBERG_KLL_SKETCH_BLOB_TYPE_ID = "presto-kll-sketch-bytes-v1";
    private static final String ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY = "ndv";
    private static final String ICEBERG_DATA_SIZE_BLOB_PROPERTY_KEY = "data_size";
    private final Table icebergTable;
    private final ConnectorSession session;
    private final TypeManager typeManager;

    private static final String STATISITCS_CACHE_METRIC_FILE_SIZE_FORMAT = "StatisticsFileCache/PuffinFileSize/%s/%s";
    private static final String STATISITCS_CACHE_METRIC_FILE_COLUMN_COUNT_FORMAT = "StatisticsFileCache/ColumnCount/%s/%s";
    private static final String STATISITCS_CACHE_METRIC_PARTIAL_MISS_FORMAT = "StatisticsFileCache/PartialMiss/%s/%s";

    private TableStatisticsMaker(Table icebergTable, ConnectorSession session, TypeManager typeManager)
    {
        this.icebergTable = requireNonNull(icebergTable, "icebergTable is null");
        this.session = requireNonNull(session, "session is null");
        this.typeManager = requireNonNull(typeManager, "typeManager is null");
    }

    private static final Map<ColumnStatisticType, PuffinBlobGenerator> puffinStatWriters = ImmutableMap.<ColumnStatisticType, PuffinBlobGenerator>builder()
            .put(NUMBER_OF_DISTINCT_VALUES, TableStatisticsMaker::generateNDVBlob)
            .put(TOTAL_SIZE_IN_BYTES, TableStatisticsMaker::generateStatSizeBlob)
            .put(HISTOGRAM, TableStatisticsMaker::generateKllSketchBlob)
            .build();

    private static final Map<String, PuffinBlobReader> puffinStatReaders = ImmutableMap.<String, PuffinBlobReader>builder()
            .put(ICEBERG_THETA_SKETCH_BLOB_TYPE_ID, TableStatisticsMaker::readNDVBlob)
            .put(ICEBERG_DATA_SIZE_BLOB_TYPE_ID, TableStatisticsMaker::readDataSizeBlob)
            .put(ICEBERG_KLL_SKETCH_BLOB_TYPE_ID, TableStatisticsMaker::readKllSketchBlob)
            .build();

    public static TableStatistics getTableStatistics(
            ConnectorSession session,
            TypeManager typeManager,
            StatisticsFileCache statisticsFileCache,
            Optional<TupleDomain<IcebergColumnHandle>> currentPredicate,
            Constraint constraint,
            IcebergTableHandle tableHandle,
            Table icebergTable,
            List<IcebergColumnHandle> columns)
    {
        return new TableStatisticsMaker(icebergTable, session, typeManager).makeTableStatistics(statisticsFileCache, tableHandle, currentPredicate, constraint, columns);
    }

    private TableStatistics makeTableStatistics(StatisticsFileCache statisticsFileCache,
            IcebergTableHandle tableHandle,
            Optional<TupleDomain<IcebergColumnHandle>> currentPredicate,
            Constraint constraint,
            List<IcebergColumnHandle> selectedColumns)
    {
        if (!tableHandle.getIcebergTableName().getSnapshotId().isPresent() || constraint.getSummary().isNone()) {
            return TableStatistics.builder()
                    .setRowCount(Estimate.of(0))
                    .build();
        }

        TupleDomain<IcebergColumnHandle> intersection = constraint.getSummary()
                .transform(IcebergColumnHandle.class::cast);
        if (currentPredicate.isPresent()) {
            intersection.intersect(currentPredicate.get());
        }

        if (intersection.isNone()) {
            return TableStatistics.builder()
                    .setRowCount(Estimate.of(0))
                    .setConfidenceLevel(HIGH)
                    .build();
        }

        List<Types.NestedField> columns = icebergTable.schema().columns();

        Map<Integer, Type.PrimitiveType> idToTypeMapping = columns.stream()
                .filter(column -> column.type().isPrimitiveType())
                .collect(Collectors.toMap(Types.NestedField::fieldId, column -> column.type().asPrimitiveType()));
        List<PartitionField> partitionFields = icebergTable.spec().fields();

        Set<Integer> identityPartitionIds = getIdentityPartitions(icebergTable.spec()).keySet().stream()
                .map(PartitionField::sourceId)
                .collect(toSet());

        List<Types.NestedField> nonPartitionPrimitiveColumns = columns.stream()
                .filter(column -> !identityPartitionIds.contains(column.fieldId()) && column.type().isPrimitiveType())
                .collect(toImmutableList());

        Partition summary;
        if (tableHandle.getIcebergTableName().getTableType() == IcebergTableType.EQUALITY_DELETES) {
            summary = getEqualityDeleteTableSummary(tableHandle, intersection, idToTypeMapping, nonPartitionPrimitiveColumns, partitionFields);
        }
        else {
            summary = getDataTableSummary(tableHandle, selectedColumns, intersection, idToTypeMapping, nonPartitionPrimitiveColumns, partitionFields);
        }

        if (summary == null) {
            return TableStatistics.builder()
                    .setRowCount(Estimate.of(0))
                    .build();
        }
        // the total record count for the whole table
        Optional<Long> totalRecordCount = Optional.of(intersection)
                .filter(domain -> !domain.isAll())
                .map(domain -> getDataTableSummary(tableHandle, ImmutableList.of(), TupleDomain.all(), idToTypeMapping, nonPartitionPrimitiveColumns, partitionFields).getRecordCount());

        double recordCount = summary.getRecordCount();
        TableStatistics.Builder result = TableStatistics.builder();
        result.setRowCount(Estimate.of(recordCount));

        // transformValues returns a view. We need to make a copy of the map in order to update
        Map<Integer, ColumnStatistics.Builder> tableStats = ImmutableMap.copyOf(
                Maps.transformValues(getClosestStatisticsFileForSnapshot(tableHandle)
                                .map(file -> loadStatisticsFile(tableHandle, file, statisticsFileCache, selectedColumns.stream()
                                        .map(IcebergColumnHandle::getId)
                                        .collect(Collectors.toList())))
                                .orElseGet(Collections::emptyMap),
                        ColumnStatistics::buildFrom));
        // scale all NDV values loaded from puffin files based on row count
        totalRecordCount.ifPresent(fullTableRecordCount -> tableStats.forEach((id, stat) ->
                stat.setDistinctValuesCount(stat.getDistinctValuesCount().map(value -> value * recordCount / fullTableRecordCount))));

        for (IcebergColumnHandle columnHandle : selectedColumns) {
            int fieldId = columnHandle.getId();
            ColumnStatistics.Builder columnBuilder = tableStats.getOrDefault(fieldId, ColumnStatistics.builder());
            Long nullCount = summary.getNullCounts().get(fieldId);
            if (nullCount != null) {
                columnBuilder.setNullsFraction(Estimate.of(nullCount / recordCount));
            }

            Object min = summary.getMinValues().get(fieldId);
            Object max = summary.getMaxValues().get(fieldId);
            if (min instanceof Number && max instanceof Number) {
                DoubleRange range = new DoubleRange(((Number) min).doubleValue(), ((Number) max).doubleValue());
                columnBuilder.setRange(Optional.of(range));

                // the histogram is generated by scanning the entire dataset. It is possible that
                // the constraint prevents scanning portions of the table. Given that we know the
                // range that the scan provides for a particular column, bound the histogram to the
                // scanned range.

                final DoubleRange histRange = range;
                columnBuilder.setHistogram(columnBuilder.getHistogram()
                        .map(histogram -> DisjointRangeDomainHistogram
                                .addConjunction(histogram, Range.range(DOUBLE, histRange.getMin(), true, histRange.getMax(), true))));
            }
            result.setColumnStatistics(columnHandle, columnBuilder.build());
        }
        return calculateAndSetTableSize(result).build();
    }

    private Partition getDataTableSummary(IcebergTableHandle tableHandle,
            List<IcebergColumnHandle> selectedColumns,
            TupleDomain<IcebergColumnHandle> intersection,
            Map<Integer, Type.PrimitiveType> idToTypeMapping,
            List<Types.NestedField> nonPartitionPrimitiveColumns,
            List<PartitionField> partitionFields)
    {
        TableScan tableScan = icebergTable.newScan()
                .filter(toIcebergExpression(intersection))
                .select(selectedColumns.stream().map(IcebergColumnHandle::getName).collect(Collectors.toList()))
                .useSnapshot(tableHandle.getIcebergTableName().getSnapshotId().get())
                .includeColumnStats();

        CloseableIterable<ContentFile<?>> files = CloseableIterable.transform(tableScan.planFiles(), ContentScanTask::file);
        return getSummaryFromFiles(files, idToTypeMapping, nonPartitionPrimitiveColumns, partitionFields);
    }

    private Partition getEqualityDeleteTableSummary(IcebergTableHandle tableHandle,
            TupleDomain<IcebergColumnHandle> intersection,
            Map<Integer, Type.PrimitiveType> idToTypeMapping,
            List<Types.NestedField> nonPartitionPrimitiveColumns,
            List<PartitionField> partitionFields)
    {
        CloseableIterable<DeleteFile> deleteFiles = IcebergUtil.getDeleteFiles(icebergTable,
                tableHandle.getIcebergTableName().getSnapshotId().get(),
                intersection,
                tableHandle.getPartitionSpecId(),
                tableHandle.getEqualityFieldIds());
        CloseableIterable<ContentFile<?>> files = CloseableIterable.transform(deleteFiles, deleteFile -> deleteFile);
        return getSummaryFromFiles(files, idToTypeMapping, nonPartitionPrimitiveColumns, partitionFields);
    }

    private Partition getSummaryFromFiles(CloseableIterable<ContentFile<?>> files,
            Map<Integer, Type.PrimitiveType> idToTypeMapping,
            List<Types.NestedField> nonPartitionPrimitiveColumns,
            List<PartitionField> partitionFields)
    {
        Partition summary = null;
        try (CloseableIterable<ContentFile<?>> filesHolder = files) {
            for (ContentFile<?> contentFile : filesHolder) {
                if (summary == null) {
                    summary = new Partition(
                            idToTypeMapping,
                            nonPartitionPrimitiveColumns,
                            contentFile.partition(),
                            contentFile.recordCount(),
                            contentFile.fileSizeInBytes(),
                            toMap(idToTypeMapping, contentFile.lowerBounds()),
                            toMap(idToTypeMapping, contentFile.upperBounds()),
                            contentFile.nullValueCounts(),
                            new HashMap<>());
                }
                else {
                    summary.incrementFileCount();
                    summary.incrementRecordCount(contentFile.recordCount());
                    summary.incrementSize(contentFile.fileSizeInBytes());
                    updateSummaryMin(summary, partitionFields, toMap(idToTypeMapping, contentFile.lowerBounds()), contentFile.nullValueCounts(), contentFile.recordCount());
                    updateSummaryMax(summary, partitionFields, toMap(idToTypeMapping, contentFile.upperBounds()), contentFile.nullValueCounts(), contentFile.recordCount());
                    summary.updateNullCount(contentFile.nullValueCounts());
                }
            }
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        return summary;
    }

    public static void writeTableStatistics(NodeVersion nodeVersion, TypeManager typeManager, IcebergTableHandle tableHandle, Table icebergTable, ConnectorSession session, Collection<ComputedStatistics> computedStatistics)
    {
        new TableStatisticsMaker(icebergTable, session, typeManager).writeTableStatistics(nodeVersion, typeManager, tableHandle, computedStatistics);
    }

    private void writeTableStatistics(NodeVersion nodeVersion, TypeManager typeManager, IcebergTableHandle tableHandle, Collection<ComputedStatistics> computedStatistics)
    {
        Snapshot snapshot = tableHandle.getIcebergTableName().getSnapshotId().map(icebergTable::snapshot).orElseGet(icebergTable::currentSnapshot);
        if (snapshot == null) {
            // this may occur if the table has not been written to.
            return;
        }
        try (FileIO io = icebergTable.io()) {
            String path = ((HasTableOperations) icebergTable).operations().metadataFileLocation(format("%s-%s.stats", session.getQueryId(), randomUUID()));
            OutputFile outputFile = io.newOutputFile(path);
            try (PuffinWriter writer = Puffin.write(outputFile)
                    .createdBy("presto-" + nodeVersion)
                    .build()) {
                computedStatistics.stream()
                        .map(ComputedStatistics::getColumnStatistics)
                        .filter(Objects::nonNull)
                        .flatMap(map -> map.entrySet().stream())
                        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
                        .forEach((key, value) -> {
                            Optional.ofNullable(puffinStatWriters.get(key.getStatisticType()))
                                    .flatMap(generator -> Optional.ofNullable(generator.generate(key, value, icebergTable, snapshot, typeManager)))
                                    .ifPresent(writer::add);
                        });
                writer.finish();
                icebergTable.updateStatistics().setStatistics(
                                snapshot.snapshotId(),
                                new GenericStatisticsFile(
                                        snapshot.snapshotId(),
                                        path,
                                        writer.fileSize(),
                                        writer.footerSize(),
                                        writer.writtenBlobsMetadata().stream()
                                                .map(GenericBlobMetadata::from)
                                                .collect(toImmutableList())))
                        .commit();
            }
            catch (IOException e) {
                log.warn(e, "failed to write table statistics file");
                throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, "failed to write statistics file", e);
            }
        }
    }

    @FunctionalInterface
    private interface PuffinBlobGenerator
    {
        @Nullable
        Blob generate(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot, TypeManager typeManager);
    }

    @FunctionalInterface
    private interface PuffinBlobReader
    {
        /**
         * Reads the stats from the blob and then updates the stats builder argument.
         */
        void read(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder stats, Table icebergTable, TypeManager typeManager);
    }

    private static Blob generateNDVBlob(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot, TypeManager typeManager)
    {
        int id = getField(metadata, icebergTable, snapshot).fieldId();
        ByteBuffer raw = VARBINARY.getSlice(value, 0).toByteBuffer();
        CompactSketch sketch = CompactSketch.wrap(Memory.wrap(raw, ByteOrder.nativeOrder()));
        return new Blob(
                ICEBERG_THETA_SKETCH_BLOB_TYPE_ID,
                ImmutableList.of(id),
                snapshot.snapshotId(),
                snapshot.sequenceNumber(),
                raw,
                null,
                ImmutableMap.of(ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY, Long.toString((long) sketch.getEstimate())));
    }

    private static Blob generateStatSizeBlob(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot, TypeManager typeManager)
    {
        int id = getField(metadata, icebergTable, snapshot).fieldId();
        long size = BIGINT.getLong(value, 0);
        return new Blob(
                ICEBERG_DATA_SIZE_BLOB_TYPE_ID,
                ImmutableList.of(id),
                snapshot.snapshotId(),
                snapshot.sequenceNumber(),
                ByteBuffer.allocate(0), // empty bytebuffer since the value is just stored on the blob properties
                null,
                ImmutableMap.of(ICEBERG_DATA_SIZE_BLOB_PROPERTY_KEY, Long.toString(size)));
    }

    private static Blob generateKllSketchBlob(ColumnStatisticMetadata metadata, Block value, Table icebergTable, Snapshot snapshot, TypeManager typeManager)
    {
        Types.NestedField field = getField(metadata, icebergTable, snapshot);
        KllSketchType sketchType = new KllSketchType(toPrestoType(field.type(), typeManager));
        Slice sketchSlice = sketchType.getSlice(value, 0);
        if (value.isNull(0)) {
            // this can occur when all inputs to the sketch are null
            return null;
        }
        return new Blob(
                ICEBERG_KLL_SKETCH_BLOB_TYPE_ID,
                ImmutableList.of(field.fieldId()),
                snapshot.snapshotId(),
                snapshot.sequenceNumber(),
                sketchSlice.toByteBuffer(),
                null,
                ImmutableMap.of());
    }

    private static void readNDVBlob(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder statistics, Table icebergTable, TypeManager typeManager)
    {
        Optional.ofNullable(metadata.properties().get(ICEBERG_THETA_SKETCH_BLOB_PROPERTY_NDV_KEY))
                .ifPresent(ndvProp -> {
                    try {
                        long ndv = parseLong(ndvProp);
                        statistics.setDistinctValuesCount(Estimate.of(ndv));
                    }
                    catch (NumberFormatException e) {
                        statistics.setDistinctValuesCount(Estimate.unknown());
                        log.warn("bad long value when parsing NDVs for statistics file blob %s. bad value: %d", metadata.type(), ndvProp);
                    }
                });
    }

    private static void readDataSizeBlob(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder statistics, Table icebergTable, TypeManager typeManager)
    {
        Optional.ofNullable(metadata.properties().get(ICEBERG_DATA_SIZE_BLOB_PROPERTY_KEY))
                .ifPresent(sizeProp -> {
                    try {
                        long size = parseLong(sizeProp);
                        statistics.setDataSize(Estimate.of(size));
                    }
                    catch (NumberFormatException e) {
                        statistics.setDataSize(Estimate.unknown());
                        log.warn("bad long value when parsing data size from statistics file blob %s. bad value: %d", metadata.type(), sizeProp);
                    }
                });
    }

    private static void readKllSketchBlob(BlobMetadata metadata, ByteBuffer blob, ColumnStatistics.Builder statistics, Table icebergTable, TypeManager typeManager)
    {
        statistics.setHistogram(Optional.ofNullable(icebergTable.schemas().get(icebergTable.snapshot(metadata.snapshotId()).schemaId()))
                .map(schema -> toPrestoType(schema.findType(getOnlyElement(metadata.inputFields().iterator())), typeManager))
                .map(prestoType -> new KllHistogram(Slices.wrappedBuffer(blob), prestoType)));
    }

    private static Types.NestedField getField(ColumnStatisticMetadata metadata, Table icebergTable, Snapshot snapshot)
    {
        return Optional.ofNullable(icebergTable.schemas().get(snapshot.schemaId()))
                .map(schema -> schema.findField(metadata.getColumnName()))
                .orElseThrow(() -> {
                    log.warn("failed to find column name %s in schema of table %s", metadata.getColumnName(), icebergTable.name());
                    return new PrestoException(ICEBERG_INVALID_METADATA, format("failed to find column name %s in schema of table %s", metadata.getColumnName(), icebergTable.name()));
                });
    }

    private void updateSummaryMin(Partition summary, List<PartitionField> partitionFields, Map<Integer, Object> lowerBounds, Map<Integer, Long> nullCounts, long recordCount)
    {
        summary.updateStats(summary.getMinValues(), lowerBounds, nullCounts, recordCount, i -> (i > 0));
        updatePartitionedStats(summary, partitionFields, summary.getMinValues(), lowerBounds, i -> (i > 0));
    }

    private void updateSummaryMax(Partition summary, List<PartitionField> partitionFields, Map<Integer, Object> upperBounds, Map<Integer, Long> nullCounts, long recordCount)
    {
        summary.updateStats(summary.getMaxValues(), upperBounds, nullCounts, recordCount, i -> (i < 0));
        updatePartitionedStats(summary, partitionFields, summary.getMaxValues(), upperBounds, i -> (i < 0));
    }

    private void updatePartitionedStats(
            Partition summary,
            List<PartitionField> partitionFields,
            Map<Integer, Object> current,
            Map<Integer, Object> newStats,
            Predicate<Integer> predicate)
    {
        for (PartitionField field : partitionFields) {
            int id = field.sourceId();
            if (summary.getCorruptedStats().contains(id)) {
                continue;
            }

            Object newValue = newStats.get(id);
            if (newValue == null) {
                continue;
            }

            Object oldValue = current.putIfAbsent(id, newValue);
            if (oldValue != null) {
                Comparator<Object> comparator = Comparators.forType(summary.getIdToTypeMapping().get(id));
                if (predicate.test(comparator.compare(oldValue, newValue))) {
                    current.put(id, newValue);
                }
            }
        }
    }

    private Optional<StatisticsFile> getClosestStatisticsFileForSnapshot(IcebergTableHandle handle)
    {
        Snapshot target = handle.getIcebergTableName().getSnapshotId().map(icebergTable::snapshot).orElseGet(icebergTable::currentSnapshot);
        return icebergTable.statisticsFiles()
                .stream()
                .min((first, second) -> {
                    if (first == second) {
                        return 0;
                    }
                    if (icebergTable.snapshot(first.snapshotId()) == null) {
                        return 1;
                    }
                    if (icebergTable.snapshot(second.snapshotId()) == null) {
                        return -1;
                    }
                    Snapshot firstSnap = icebergTable.snapshot(first.snapshotId());
                    Snapshot secondSnap = icebergTable.snapshot(second.snapshotId());
                    long firstDiff = abs(target.timestampMillis() - firstSnap.timestampMillis());
                    long secondDiff = abs(target.timestampMillis() - secondSnap.timestampMillis());

                    // check if total-record exists
                    Optional<Long> targetTotalRecords = Optional.ofNullable(target.summary().get(TOTAL_RECORDS_PROP)).map(Long::parseLong);
                    Optional<Long> firstTotalRecords = Optional.ofNullable(firstSnap.summary().get(TOTAL_RECORDS_PROP))
                            .map(Long::parseLong);
                    Optional<Long> secondTotalRecords = Optional.ofNullable(secondSnap.summary().get(TOTAL_RECORDS_PROP))
                            .map(Long::parseLong);

                    if (targetTotalRecords.isPresent() && firstTotalRecords.isPresent() && secondTotalRecords.isPresent()) {
                        long targetTotal = targetTotalRecords.get();
                        double weight = getStatisticSnapshotRecordDifferenceWeight(session);
                        firstDiff += (long) (weight * abs(firstTotalRecords.get() - targetTotal));
                        secondDiff += (long) (weight * abs(secondTotalRecords.get() - targetTotal));
                    }

                    return Long.compare(firstDiff, secondDiff);
                });
    }

    /**
     * Builds a map of field ID to ColumnStatistics for a particular {@link StatisticsFile}.
     */
    private Map<Integer, ColumnStatistics> loadStatisticsFile(IcebergTableHandle tableHandle, StatisticsFile file, StatisticsFileCache statisticsFileCache, List<Integer> columnIds)
    {
        // first, try to load all stats from the cache. If the map doesn't contain all keys, load the missing
        // stats into the cache.
        Map<Integer, ColumnStatistics> cachedStats = columnIds.stream()
                .map(id -> Pair.of(id, statisticsFileCache.getIfPresent(new StatisticsFileCacheKey(file, id))))
                .filter(pair -> pair.second() != null)
                .collect(toImmutableMap(Pair::first, Pair::second));
        Set<Integer> missingStats = columnIds.stream().filter(id -> !cachedStats.containsKey(id)).collect(toImmutableSet());
        if (missingStats.isEmpty()) {
            return cachedStats;
        }
        if (!cachedStats.isEmpty()) {
            session.getRuntimeStats().addMetricValue(
                    String.format(STATISITCS_CACHE_METRIC_PARTIAL_MISS_FORMAT,
                            tableHandle.getSchemaTableName(),
                            file.path()),
                    RuntimeUnit.NONE, 1);
        }

        String fileSizeMetricName = String.format(STATISITCS_CACHE_METRIC_FILE_SIZE_FORMAT,
                tableHandle.getSchemaTableName(),
                file.path());
        if (session.getRuntimeStats().getMetric(fileSizeMetricName) == null) {
            long fileSize = file.fileFooterSizeInBytes();
            session.getRuntimeStats().addMetricValue(fileSizeMetricName, RuntimeUnit.NONE, fileSize);
            statisticsFileCache.recordFileSize(fileSize);
        }
        String columnCountMetricName = String.format(STATISITCS_CACHE_METRIC_FILE_COLUMN_COUNT_FORMAT,
                tableHandle.getSchemaTableName(),
                file.path());
        if (session.getRuntimeStats().getMetric(columnCountMetricName) == null) {
            long columnCount = file.blobMetadata().size();
            session.getRuntimeStats().addMetricValue(columnCountMetricName, RuntimeUnit.NONE, columnCount);
            statisticsFileCache.recordColumnCount(columnCount);
        }
        Map<Integer, ColumnStatistics.Builder> result = new HashMap<>();
        try (FileIO io = icebergTable.io()) {
            InputFile inputFile = io.newInputFile(file.path());
            try (PuffinReader reader = Puffin.read(inputFile).build()) {
                for (Pair<BlobMetadata, ByteBuffer> data : reader.readAll(reader.fileMetadata().blobs())) {
                    BlobMetadata metadata = data.first();
                    ByteBuffer blob = data.second();
                    Integer field = getOnlyElement(metadata.inputFields());
                    if (!missingStats.contains(field)) {
                        continue;
                    }

                    Optional.ofNullable(puffinStatReaders.get(metadata.type()))
                            .ifPresent(statReader -> {
                                result.compute(field, (key, value) -> {
                                    if (value == null) {
                                        value = ColumnStatistics.builder();
                                    }
                                    statReader.read(metadata, blob, value, icebergTable, typeManager);
                                    return value;
                                });
                            });
                }
            }
            catch (IOException e) {
                throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, "failed to read statistics file at " + file.path(), e);
            }
        }
        Map<Integer, ColumnStatistics> computedResults = new HashMap<>(Maps.transformValues(result, ColumnStatistics.Builder::build));
        missingStats.stream()
                .filter(id -> !computedResults.containsKey(id))
                .forEach(id -> computedResults.put(id, ColumnStatistics.empty()));
        ImmutableMap.Builder<Integer, ColumnStatistics> finalResult = ImmutableMap.builder();
        computedResults.forEach((key, value) -> {
            // stats for a particular column may not appear in a file. Add a cache entry to
            // denote that this file has been read for a particular column to avoid reading the
            // file again
            statisticsFileCache.put(new StatisticsFileCacheKey(file, key), value);
            finalResult.put(key, value);
        });
        finalResult.putAll(cachedStats);
        return finalResult.build();
    }

    public static List<ColumnStatisticMetadata> getSupportedColumnStatistics(ConnectorSession session, String columnName, com.facebook.presto.common.type.Type type)
    {
        ImmutableList.Builder<ColumnStatisticMetadata> supportedStatistics = ImmutableList.builder();
        // all types which support being passed to the sketch_theta function
        if (isNumericType(type) || type.equals(DATE) || isVarcharType(type) ||
                type.equals(TIMESTAMP) ||
                type.equals(TIMESTAMP_WITH_TIME_ZONE)) {
            supportedStatistics.add(NUMBER_OF_DISTINCT_VALUES.getColumnStatisticMetadataWithCustomFunction(
                    columnName, format("RETURN sketch_theta(%s)", formatIdentifier(columnName)), ImmutableList.of(columnName)));
        }

        if (isKllHistogramSupportedType(type)) {
            String histogramFunctionFmt = "RETURN sketch_kll_with_k(%s, CAST(%s as bigint))";
            if (type instanceof DecimalType) {
                histogramFunctionFmt = "RETURN sketch_kll_with_k(CAST(%s as double), CAST(%s as bigint))";
            }
            supportedStatistics.add(HISTOGRAM.getColumnStatisticMetadataWithCustomFunction(columnName,
                    format(histogramFunctionFmt, formatIdentifier(columnName), getStatisticsKllSketchKParameter(session)),
                    ImmutableList.of(columnName)));
        }

        if (!(type instanceof FixedWidthType)) {
            supportedStatistics.add(TOTAL_SIZE_IN_BYTES.getColumnStatisticMetadata(columnName));
        }

        return supportedStatistics.build();
    }
}
